package com.k2data.example;


import com.k2data.common.Constants;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.k2data.server.Server;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;


public class StringHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(StringHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        long st = System.currentTimeMillis();
        if(msg == null || msg.toString().trim().isEmpty()){
           // LOG.warn("received a null record.");
            //协议修改为')'结尾,双括号结尾的信息可能会产生空数据,此时返回ack为(ok)进行下一条
            ctx.writeAndFlush(Unpooled.copiedBuffer(Constants.RESPONSE_SUCCESS.getBytes()));
            return;
        }
        //get the message
        String record = msg.toString();
        //invoke kafka api
        //check it was heard beat test
        if(record.equalsIgnoreCase(Constants.CLIENT_HEART_BEAT_TEST_CONTENT)){
            LOG.info("client "+ctx.channel().remoteAddress() + " send a heart beat test : " + record);
            ctx.writeAndFlush(Unpooled.copiedBuffer(Constants.RESPONSE_HEART_BEAT_TEST_SUCCESS.getBytes()));
            return;
        }
        //check it start with (
        if(!record.startsWith("(") && !record.startsWith(";(")){
            LOG.error("received an error record, not start with '('.");
            ctx.writeAndFlush(Unpooled.copiedBuffer(Constants.RESPONSE_ERR.getBytes()));
            return;
        }
        String recordBody = record.replaceFirst("\\(","");
        String[] records = recordBody.split("\\$");
        for(String rec : records){
            try {
                Server.getClient().sendMessageSync(Constants.HN_7sDATA_KEY+getRandomNum(), rec);
                LOG.debug("send 1 record to kafka success.");
            } catch (ExecutionException e) {
                LOG.error("ExecutionException: send msg to kafka is failed.");
                e.printStackTrace();
                ctx.writeAndFlush(Unpooled.copiedBuffer(Constants.RESPONSE_ERR.getBytes()));
                LOG.debug("return "+Constants.RESPONSE_ERR+" - for fail send to kafka.");
                return;
            } catch (InterruptedException e) {
                LOG.error("InterruptedException: send msg to kafka is failed.");
                e.printStackTrace();
                ctx.writeAndFlush(Unpooled.copiedBuffer(Constants.RESPONSE_ERR.getBytes()));
                LOG.debug("return "+Constants.RESPONSE_ERR+" - for fail send to kafka.");
                return;
            }
        }
        //response to client "we received it"
        LOG.info("receive cost: "+(System.currentTimeMillis()-st));
        ctx.writeAndFlush(Unpooled.copiedBuffer(Constants.RESPONSE_SUCCESS.getBytes()));
    }

    private String getRandomNum(){
        return String.valueOf(ThreadLocalRandom.current().nextInt(10000, 100000));
    }
    /*
     *
     *when client connect to server
     * we print client address
     * we response welcome to xx service
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOG.info("RemoteAddress : " + ctx.channel().remoteAddress() + " active !");
        //ctx.writeAndFlush("Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
        super.channelActive(ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOG.info("RemoteAddress : " + ctx.channel().remoteAddress() + " inactive !");
        //ctx.writeAndFlush("Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
        super.channelActive(ctx);
    }

}